This notebook provides an analysis of On-Time Flight Performance and Departure Delays data using GraphFrames for Apache Spark.

Source Data:
References:
Connecting Apache Spark to Azure Cosmos DB accelerates your ability to solve your fast moving Data Sciences problems where your data can be quickly persisted and retrieved using Azure Cosmos DB's DocumentDB API. With the Spark to Cosmos DB conector, you can more easily solve scenarios including (but not limited to) blazing fast IoT scenarios, update-able columns when performing analytics, push-down predicate filtering, and performing advanced analytics to data sciences against your fast changing data against a geo-replicated managed document store with guaranteed SLAs for consistency, availability, low latency, and throughput.
The Spark to Cosmos DB connector utilizes the Azure DocumentDB Java SDK will utilize the following flow:

The data flow is as follows:
%%configure
{ "name":"Spark-to-Cosmos_DB_Connector",
"executorMemory": "8G",
"executorCores": 2,
"numExecutors": 2,
"driverCores": 2,
"jars": ["wasb:///example/jars/0.0.3c/azure-documentdb-1.10.0.jar","wasb:///example/jars/0.0.3c/azure-cosmosdb-spark-0.0.3-SNAPSHOT.jar"],
"conf": {
"spark.jars.packages": "graphframes:graphframes:0.5.0-spark2.1-s_2.11",
"spark.jars.excludes": "org.scala-lang:scala-reflect"
}
}
# Connection
flightsConfig = {
"Endpoint" : "https://doctorwho.documents.azure.com:443/",
"Masterkey" : "xWpfqUBioucC2YkWV6uHVhgZtsPIjIVmE4VDPyNYnw2QUazvCHm3rnn9AeSgglLOT3yfjCR5YbLeh5MCc3aKNw==",
"Database" : "DepartureDelays",
"preferredRegions" : "Central US",
"Collection" : "flights_pcoll",
"SamplingRatio" : "1.0",
"schema_samplesize" : "1000",
"query_pagesize" : "2147483647",
"query_custom" : "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c"
}
flights = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**flightsConfig).load()
flights.count()
flights.cache()
flights.createOrReplaceTempView("flights")
# Set File Paths
airportsnaFilePath = "wasb://data@doctorwhostore.blob.core.windows.net/airport-codes-na.txt"
# Obtain airports dataset
airportsna = spark.read.csv(airportsnaFilePath, header='true', inferSchema='true', sep='\t')
airportsna.createOrReplaceTempView("airports")
%%sql
select count(1) from flights where origin = 'LAS'
%%sql
select concat(concat((dense_rank() OVER (PARTITION BY 1 ORDER BY TotalDelays DESC)-1), '. '), destination) as destination, TotalDelays
from (
select a.city as destination, sum(f.delay) as TotalDelays, count(1) as Trips
from flights f
join airports a
on a.IATA = f.destination
where f.origin = 'LAS'
and f.delay > 0
group by a.city
order by sum(delay) desc limit 10
) a
%%sql
select a.city as destination, percentile_approx(f.delay, 0.5) as median_delay
from flights f
join airports a
on a.IATA = f.destination
where f.origin = 'LAS'
group by a.city
order by percentile_approx(f.delay, 0.5)
Using GraphFrames for Apache Spark to run degree and motif queries against Cosmos DB
# Build `departureDelays` DataFrame
departureDelays = spark.sql("select cast(f.date as int) as tripid, cast(concat(concat(concat(concat(concat(concat('2014-', concat(concat(substr(cast(f.date as string), 1, 2), '-')), substr(cast(f.date as string), 3, 2)), ' '), substr(cast(f.date as string), 5, 2)), ':'), substr(cast(f.date as string), 7, 2)), ':00') as timestamp) as `localdate`, cast(f.delay as int), cast(f.distance as int), f.origin as src, f.destination as dst, o.city as city_src, d.city as city_dst, o.state as state_src, d.state as state_dst from flights f join airports o on o.iata = f.origin join airports d on d.iata = f.destination")
# Create Temporary View and cache
departureDelays.createOrReplaceTempView("departureDelays")
departureDelays.cache()
# Note, ensure you have already installed the GraphFrames spack-package
import os
sc.addPyFile(os.path.expanduser('./graphframes_graphframes-0.5.0-spark2.1-s_2.11.jar'))
from pyspark.sql.functions import *
from graphframes import *
# Create Vertices (airports) and Edges (flights)
tripVertices = airportsna.withColumnRenamed("IATA", "id").distinct()
tripEdges = departureDelays.select("tripid", "delay", "src", "dst", "city_dst", "state_dst")
# Cache Vertices and Edges
tripEdges.cache()
tripVertices.cache()
# Create TripGraph
tripGraph = GraphFrame(tripVertices, tripEdges)
Note, the joins are there to see the city name instead of the IATA codes. The rank() code is there to help order the data correctly when viewed in Jupyter notebooks.
flightDelays = tripGraph.edges.filter("src = 'LAS' and delay > 0").groupBy("src", "dst").avg("delay").sort(desc("avg(delay)"))
flightDelays.createOrReplaceTempView("flightDelays")
%%sql
select concat(concat((dense_rank() OVER (PARTITION BY 1 ORDER BY avg_delay DESC)-1), '. '), city) as destination,
avg_delay
from (
select a.city, `avg(delay)` as avg_delay
from flightDelays f
join airports a
on f.dst = a.iata
order by `avg(delay)`
desc limit 10
) s
It would take a relatively complicated SQL statement to calculate all of the edges to a single vertex, grouped by the vertices. Instead, we can use the graph degree method.
airportConnections = tripGraph.degrees.sort(desc("degree"))
airportConnections.createOrReplaceTempView("airportConnections")
%%sql
select concat(concat((dense_rank() OVER (PARTITION BY 1 ORDER BY degree DESC)-1), '. '), city) as destination,
degree
from (
select a.city, f.degree
from airportConnections f
join airports a
on a.iata = f.id
order by f.degree desc
limit 10
) a
filteredPaths = tripGraph.bfs(
fromExpr = "id = 'SEA'",
toExpr = "id = 'SJC'",
maxPathLength = 1)
filteredPaths.show()
SJC and BUF, i.e. direct flightSJC and BUF, i.e. all the different variations of flights between San Jose and Buffalo with only one stop oever in between? filteredPaths = tripGraph.bfs(
fromExpr = "id = 'SJC'",
toExpr = "id = 'BUF'",
maxPathLength = 1)
filteredPaths.show()
filteredPaths = tripGraph.bfs(
fromExpr = "id = 'SJC'",
toExpr = "id = 'BUF'",
maxPathLength = 2)
filteredPaths.show()
commonTransferPoint = filteredPaths.groupBy("v1.id", "v1.City").count().orderBy(desc("count"))
commonTransferPoint.createOrReplaceTempView("commonTransferPoint")
%%sql
select concat(concat((dense_rank() OVER (PARTITION BY 1 ORDER BY Trips DESC)-1), '. '), city) as destination,
Trips
degree
from (
select City, `count` as Trips from commonTransferPoint order by Trips desc limit 10
) a
Extending upon analysis we have done up to this point, can we also predict if a flight will be delayed, on-time, or early based on the available data.
The first thing we will do is to cleanse the data and apply some labels to our information (e.g. early, on-time, delayed). As well, we will want to remove any rows with NULL values.
# This contains a generated mapping between tripid and airline
# You can get the file at https://github.com/dennyglee/databricks/blob/master/misc/trip_airline_map.csv
# For this example, the trip_airline_map.csv file has been pushed to in my mounted bucket.
#tripAirlineMapFilePath = "wasb://data@doctorwhostore.blob.core.windows.net/trip_airline_map.csv"
#tripAirlineMap = spark.read.csv(tripAirlineMapFilePath, sep=",", header=True)
#tripAirlineMap.createOrReplaceTempView("tripAirlineMap")
# Prep dataset
#flightML = spark.sql("select cast(distance as double) as distance, src as origin, state_src as origin_state, dst as destination, state_dst as destination_state, concat(concat(concat(cast(tripid as string), src), dst), cast((delay + 2000) as string)) as trip_identifier, case when delay = 0 then 'on-time' when delay < 0 then 'early' else 'delayed' end as flight_status from departureDelays")
flightML = spark.sql("select cast(distance as double) as distance, src as origin, state_src as origin_state, dst as destination, state_dst as destination_state, concat(concat(concat(cast(tripid as string), src), dst), cast((delay + 2000) as string)) as trip_identifier, case when delay = 0 then 'on-time' else 'delayed' end as flight_status from departureDelays where src IN ('LAS', 'SEA')")
flightML = flightML.dropna().dropDuplicates()
flightML.createOrReplaceTempView("flightML")
# Join flights and airline information
#dataset = spark.sql("select f.distance, f.origin, f.origin_state, f.destination, f.destination_state, f.trip_identifier, f.flight_status, m.airline from flightML f join tripAirlineMap m on m.trip_identifier = f.trip_identifier")
dataset = flightML
cols = dataset.columns
dataset.printSchema()
Before we can run our various models against this data, we will first need to vectorize our data via One-Hot Encorder (for category data), String Indexer (create an index based on our labelled values), and Vector Assembler.
# One-Hot Encoding
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
#categoricalColumns = ["origin", "origin_state", "destination", "destination_state", "trip_identifier", "airline"]
categoricalColumns = ["origin", "origin_state", "destination", "destination_state", "trip_identifier"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
# Category Indexing with StringIndexer
stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
# Use OneHotEncoder to convert categorical variables into binary SparseVectors
encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
# Add stages. These are not run here, but will run all at once later on.
stages += [stringIndexer, encoder]
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = "flight_status", outputCol = "label")
stages += [label_stringIdx]
# Transform all features into a vector using VectorAssembler
numericCols = ["distance"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
# - fit() computes feature statistics as needed.
# - transform() actually transforms the features.
pipelineModel = pipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)
# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = dataset.select(selectedcols)
dataset.show()
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
Let's try using logistic regression to see if we can accurately predict if a flight will be delayed.
from pyspark.ml.classification import LogisticRegression
# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)
# Train model with Training Data
lrModel = lr.fit(trainingData)
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)
selected = predictions.select("label", "prediction", "probability", "flight_status", "destination", "destination_state").where("destination = 'SEA'")
selected.show()
BinaryClassificationEvaluator to evaluate our model¶from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)